package com.rem.flink.flink8State;

import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @author Rem
 * @date 2022-10-26
 */

public class BroadcastStateTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 读取用户行为事件流
        DataStreamSource<Action> actionStream = env.fromElements(
                new Action("Alice", "login"),
                new Action("Alice", "pay"),
                new Action("Bob", "login"),
                new Action("Bob", "buy")
        );

        // 定义行为模式流，代表了要检测的标准
        DataStreamSource<Pattern> patternStream = env.fromElements(
                new Pattern("login", "pay"),
                new Pattern("login", "buy")
        );


        MapStateDescriptor<Void, Pattern> mapStateDescriptor = new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));
        //创建广播流
        BroadcastStream<Pattern> broadcast = patternStream.broadcast(mapStateDescriptor);

        actionStream.keyBy(a -> a.userId)
                .connect(broadcast)
                .process(new KeyedBroadcastProcessFunction<String, Action, Pattern, Tuple2<String, Pattern>>() {

                    //保存上一次用户行为
                    ValueState<String> valueState;

                    @Override
                    public void open(Configuration parameters) {
                        valueState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastAction", Types.STRING));
                    }


                    @Override
                    public void processBroadcastElement(Pattern pattern, Context ctx, Collector<Tuple2<String, Pattern>> out) throws Exception {
                        BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)));
                        // 将广播状态更新为当前的pattern
                        bcState.put(null, pattern);
                    }

                    @Override
                    public void processElement(Action action, ReadOnlyContext ctx, Collector<Tuple2<String, Pattern>> out) throws Exception {
                        Pattern pattern = ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class))).get(null);

                        String prevAction = valueState.value();
                        if (pattern != null && prevAction != null) {
                            // 如果前后两次行为都符合模式定义，输出一组匹配
                            if (pattern.action1.equals(prevAction) && pattern.action2.equals(action.action)) {
                                out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
                            }
                        }
                        // 更新状态
                        valueState.update(action.action);
                    }
                }).print();

        env.execute();
    }


    /**
     * 定义用户行为事件POJO类
     */
    public static class Action {
        public String userId;
        public String action;

        public Action() {
        }

        public Action(String userId, String action) {
            this.userId = userId;
            this.action = action;
        }

        @Override
        public String toString() {
            return "Action{" +
                    "userId=" + userId +
                    ", action='" + action + '\'' +
                    '}';
        }
    }

    /**
     * 定义行为模式POJO类，包含先后发生的两个行为
     */
    public static class Pattern {
        public String action1;
        public String action2;

        public Pattern() {
        }

        public Pattern(String action1, String action2) {
            this.action1 = action1;
            this.action2 = action2;
        }

        @Override
        public String toString() {
            return "Pattern{" +
                    "action1='" + action1 + '\'' +
                    ", action2='" + action2 + '\'' +
                    '}';
        }
    }
}
